《Effective Python》笔记

Effecitve Python 阅读

《Effective Python》笔记

列表与字典

11.学会对序列做切片

  • 切片要尽可能写的简单,如果从头开始选,就省略起始下标0;如果选到片列末尾,省略终止下标
  • 切片允许起始下标或终止下标越界,所以很容易就能表达“取开头多少个元素”或“取末尾多少个元素”,而不担心切片是否真有那么多元素
  • 把切片放在赋值符号的左侧可以将原列表中这段范围内的元素用赋值符号右侧的元素替换掉,但可能改变原列表的长度

12.不要在切片里同时指定起止下标与步进

  • 同时指定切片的起止下标与步进理解起来会很困难
  • 如果要指定步进值,那就省略起止下标,最好采用正数作为步进值,尽量别用负数
  • 不要把起始位置、终止位置与步进值全都写在同一个切片操作里。如果必须同时使用这三个指标,那就分两次(其中一次隔位选取、另一次做切割),也可以使用itertools中的islice方法

13.通过带星号的unpacking操作来捕获多个元素,不要用切片

  • 拆分数据结构并把其中的数据赋给变量时,可以用带星号的表达式,将结构中无法与普通变量相匹配的内容捕获到一份列表里
  • 这种带星号的表达式可以出现在赋值符号左侧的任意位置,它总是会形成一份含有零个或多个值的列表
  • 这种带星号的unpacking方式比较清晰,通过下标或切片容易出错
    1
    2
    3
    4
    oldest. second_oldest, *others = car_ages_descending
    print(oldest, second_oldest, others)
    >>>
    20 19 [15, 9, 8, 7, 6, 4, 1, 0]

14.用 sort 方法的 key 参数来表示复杂的排序逻辑

  • 凡是具备自然顺序的内置类型几乎都可以用 sort 方法排列,例如字符串、浮点数
  • __repr__ 是 Python 中的一个特殊方法 (special method),用于定义对象的 “official” 字符串表示形式。这个方法会在调用内置函数 repr() 时被调用,或者在交互式环境中直接输出对象时被自动调用。它的作用是返回一个可以用来重新创建对象的字符串表示形式。
  • !r 是一种格式说明符,用于在格式化字符串时表示要对值进行 “转换”。在这种情况下,!r 用于调用 repr() 函数来获得对象的“官方”字符串表示形式。f'Point({self.x!r}, {self.y})' 中的 !r 会确保 self.xself.y 的值都使用它们的官方字符串表示形式,这在创建对象的字符串表示形式时非常有用。
  • 可以把辅助函数传给 sort 方法的 key 参数,让 sort 根据函数逻辑来排列元素顺序,而不是根据元素本身
    1
    tools.sort(key=lambda x:x.name)
  • 排序有很多指标要一句,可以把它们放在一个元组里,让key函数返回这样的元组,如果支持一元减操作符,可以单独给这项指标取反
    1
    tools.sort(key=lambda x:(-x.weight, x.name))
  • 如果这些指标不支持一元减操作符,可以多次调用 sort 方法,并在每次调用时分别指定 key 函数与 reverse 参数。最次要的放第一轮,重要的放最后
    1
    2
    tools.sort(key=lambda x:(x.weight))
    tools.sort(key=lambda x:(x.name, reverse=True))

15. 不要过分依赖字典添加条目时所用的顺序

  • 在python3.5与之前版本,字典不保证迭代顺序与插入顺序一致。因为字典类型以前是用哈希表算法实现的。Python3.6开始,字典会保留这些键值对在添加时所用的顺序,3.7 语言规范正式确立了这条规则。
  • 内置的 collections 模块早就能提供这种保留插入顺序的字典,叫做 OrderDict,和标准的 dict 很像,但是如果频繁插入或弹出键值对,那么 OrderDict 更适合。
  • Python 不是静态类型的语言,大多数代码都以鸭子类型(duck typing)机制运作(也就是说,对象支持什么样的行为,就可以当成什么样的数据使用,而不用执着于它在类体系中的地位)
  • 对于不可变类型(例如整数、浮点数、字符串、元组等),传递的是值的副本,因此类似于深复制的概念。
  • 对于可变类型(例如列表、字典、集合等),传递的是引用,因此类似于浅复制的概念,可能会对原始对象产生影响。
  • 双下划线(例如 __xx__)开头和结尾的方法是特殊方法(也称为魔术方法或双下划线方法),特殊方法是 Python 中用于实现对象特定行为的机制,而普通方法则用于一般的对象行为和逻辑。
  • 如果不想把这种跟标准字典类型很相似的类型也当作标准字典来处理,那么可以考虑三种办法。第一,不要依赖插入时的顺序代码,加一段顺序判断;第二,在程序运行时明确判断它是不是标准字典;第三,给代码添加类型注解并做静态分析
    1
    2
    3
    def popular_rank(votes: Dict[str, int], ranks: Dict[str, int]) -> None:

    python3 -m mypy --strict example.py

16. 用 get 处理键不在字典中的情况,不要使用 in 与 KeyError

  • 一般判断字典里面有没有这个 Key,可以采用 in 或者 KeyError
    1
    2
    3
    4
    5
    6
    7
    8
    9
    if key in counters:
    xxx
    else:
    xxx

    try:
    count = counters[key]
    except KeyError:
    count = 0
  • Python 内置的字典(dict)类型提供了get方法,可以通过第一个参数指定自己想查的键,并通过第二个参数指定这个键不存在时返回的默认值
    1
    count = counters.get(key, 0)
  • dict 类型提供了 setdefault 方法。如果有就返回对应的值,如果没有就先把用户提供的默认值跟这个键关联起来
    1
    2
    names = votes.setdefault(key, [])
    names.append(who)

17. 用defaultdict处理内部状态中缺失的元素,而不用setdefault

  • 如果你管理的字典可能需要添加任意的键,那么应该考虑能否用内置的 collections 模块中的 defaultdict 实例来解决问题
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    from collections import defaultdict

    class Visits:
    def __init__(self):
    self.data = {}

    def add(self, country, city):
    city_set = self.data.setdefault(country, set())
    city_set.add(city)

    class Visits:
    def __init__(self):
    self.data = defaultdict(set)

    def add(self, country, city):
    self.data[country].add(city)

18. 学会利用__missig__构造依赖值的默认值

  • 如果创建默认值需要较大的开销,或者可能抛出异常,那就不适合使用dict类型的setdefault方法实现
  • 传给defaultdict的函数必须是不需要参数的函数,所以无法创建出需要依赖键名的默认值
  • 如果要构造的默认值必须根据键名来确定,那么可以定义自己的dict子类并实现__missing__方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    class Pictures(dict):
    def __missinng__(self, key):
    value = open_picture(key)
    self[key] = valu
    return value
    pictures = Pictures()
    handlie = pictures[path]
    handle.seek(0)
    image_data = handle.read()

函数

19. 不要把函数返回的多个数值拆分到三个以上的变量中

  • 函数可以把多个值合起来通过一个元组返回给调用者,以便利用Python的 unpacking 机制去拆分
  • 对于函数返回的多个值,可以把普通变量没有捕获到的那些值全都捕获到一个带星号的变量中
  • 把返回的值拆分到四个或四个以上的变量很容易出错,所以最好不要啊么些,而是应该通过小类或 namedtuple 实例完成

20. 遇到意外状况时应该抛出异常,不要返回 None

  • 用返回值 None 表示特殊情况时很容易出错的,没办法与0和空白字符串之类的值区分,这些值都相当于 False
  • 用异常表示特殊的情况,而不要返回 None。让调用这个函数的程序根据文档里写的异常情况做出处理
  • 通过类型注解可以明确禁止函数返回 None,即便在特殊情况下,它也不能返回这个值
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    def careful_divide(a, b):
    try:
    return a/b
    except ZeroDivisionError:
    return None

    def careful_divide(a, b):
    try:
    return a/b
    except ZeroDivisionError:
    return False, None

    def careful_divide(a, b):
    try:
    return a/b
    except ZeroDivisionError:
    raise ValueError('Invalid inputs')

    x,y = 5,2
    try:
    result = careful_divide(a,b)
    except ValueError:
    print('Invalid inputs')
    else:
    print(result)

21. 了解如何在闭包里面使用外围作用域中的变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def sort_priority(values, group):
def helper(x):
if x in group:
return (0,x)
return (1,x)
values.sort(key=helper)

numbers = [8,3,1,2,5,4,7,6]
group = {2,3,5,7}
sort_priority(numbers, group)
print(numbers)

>>>
[2,3,5,7,1,4,6,8]
  • 上述写法,为什么能成功
    1. python 支持闭包,大函数里面的小函数也能引用大函数之中的变量
    2. 函数在 Python 里是头等对象,所以你可以像操作其它对象一样,直接引用它们、把它们赋给变量、将它们当成参数传给其他函数,或是在in表达式比较等等。
    3. python 在判断两个序列(包括元组)的大小时,有自己的一套规则。它首先比较0号位置的那两个元素,如果相等,那就比较1号位置的那两个元素;依次类推
  • 在表达式中引用变量时,Python解释器会按照下面顺序,在作用域查找变量
    1)当前函数的作用域
    2)外围作用域(例如包含当前函数的其他函数所对应的作用域)
    3)包含当前代码的那个模块所对应的作用域(也叫全局作用域,global scope)
    4)内置作用域(built-in scope,也就是包含 len 与 str 等函数的那个作用域)
  • 变量赋值不同,如果变量已经定义在当前作用域中,那么直接把新值赋给他就好。如果当前域不存在这个变量,即便外围作用域里有同名的变量,Python也还是会吧这次的赋值当成新变量的定义
  • 上述问题也称作作用域bug,python 是故意这么设计的,防止函数中的局部变量污染外围模块
  • Python 里面有一种特殊的写法,可以把闭包里的数据赋给闭包外的变量。用 nonlocal,但是nonlocal不能侵染模块级别的作用域
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    def sort_priority3(numbers, group):
    found = False
    def helper(x):
    nonlocal found
    if x in group:
    found = True
    return (0,x)
    return (1,x)
    numbers.sort(key=helper)
    return found
  • nonlocal 表明要把数据赋给闭包外的变量,global 表明放到模块作用域中

22. 用数量可变的位置参数给函数设计清晰的参数列表

  • 让函数接受数量可变的位置参数(positional argument),在python里,可以给最后一个位置参数加前缀*,这样调用者只需要提供不带星号的那些参数,然后可以不再指其他参数。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    def log(message, values):
    if not values:
    print(message)
    else:
    values_str = ', 'join(str(x) for x in values)
    print(f'{message}: {values_str}')

    log('My numbers are',[1,2])
    log('Hi there', [])

    def log(message, *values):
    if not values:
    print(message)
    else:
    values_str = ', 'join(str(x) for x in values)
    print(f'{message}: {values_str}')
    log('My numbers are',1,2)
    log('Hi there')
  • 如果想把已有序列里面的元素当成参数传给像参数个数可变的函数,那么可以在传递序列的时采用*操作符
    1
    2
    fav = [7,33,99]
    log('Favorite colors',*fav)
  • 如果*操作符加在生成器前,那么传递参数时,程序有可能因为耗尽内存而崩溃
  • 给接受 *args 的函数添加新位置参数,可能导致难以排查的bug

23. 用关键字参数来表示可选的行为

  • 如果有一份字典,那么可以把**运算符加在字典前面,这会让Python把字典里面的键值以关键词的形式传给函数
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    my_kwargs = {
    'number':20
    'divisor':7
    }
    assert remainder(**my_kwargs) == 6

    my_kwargs = {
    'number':20
    }
    other_kwargs = {
    'divisor':7
    }
    assert remainder(**my_kwargs, **other_kwargs) == 6
  • 关键字参数的灵活用法可以带来三个好处
    1. 用关键字参数调用函数可以让初次阅读代码的人更容易看懂
    2. 可以带有默认值,该值是在定义函数时指定的
    3. 可以很灵活地扩充函数的参数,而不用担心会影响原有的函数调用代码

24.用 None 和 docstring 来描述默认值会变的参数

  • 参数的默认值只会在系统加载这个模块的时候,计算一遍,而不会在每次执行时都重新计算,这通常意味着这些默认值在程序启动后,就已经定下来了,例如下面这个例子,datetime.now() 是不会重新计算的
    1
    2
    3
    4
    5
    6
    def log(message, when=datetime.now()):
    print(f'{when}:{message}')

    log('HI there')
    sleep(0.1)
    log('Hello again')
  • 改成下面这样就可以实现time的变化了
    1
    2
    3
    4
    5
    6
    7
    8
    def log(message, when=None):
    if when is None:
    when = datetime.now()
    print(f'{when}:{message}')

    log('HI there')
    sleep(0.1)
    log('Hello again')
  • 默认值为 None 的关键字参数,也可以添加类型注解
    1
    2
    3
    4
    5
    6
    7
    8
    def log(message:str, when=Optional[datetime]=None) -> None:
    if when is None:
    when = datetime.now()
    print(f'{when}:{message}')

    log('HI there')
    sleep(0.1)
    log('Hello again')
  • 如果关键字参数的默认值属于这种会发生变化的值,那就应该写成 None,并且要在 docstring 里面描述函数此时的默认行为

25.用只能以关键字指定和只能按位置传入的参数来设计清晰的参数列表

  • 对于参数比较复杂的函数,我们可以声明只能通过关键字指定的参数(keyword-only argument),这种参数只能用关键字来指定,不能按位置传递,参数列表里的 * 符号把参数分为两组,左边是位置参数,右边是只能用关键字指定的参数。
    1
    def safe_division_c(number, divisor, * , ignore_overflow=False, ignore_zero_division = False):
  • Python3.8 引入了一项新特性,可以解决这个问题,这就是只能按位置传递的参数(positional-only argument),参数列表中的 / 符号,表示他左边的那些参数只能按位置指定
    1
    def safe_division_c(number, divisor, /, * , ignore_overflow=False, ignore_zero_division = False):
  • /* 中间的参数就是既可以按照位置也可以按照关键字指定的参数

26.用functools.wraps定义函数修饰器

  • Python 可以使用修饰器来封装某个函数,从而让程序在执行这个函数之前与执行完这个函数后,分别运行某些代码。这是个很有用的机制,能够确保以正确的方式使用函数。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    def trace(func):
    def wrapper(*args, **kwargs):
    result = func(*args, **kwargs)
    print(f'{func.__name__}(args!r, {kwargs!r}) '
    f'-> {result!r}')
    return result
    return wrapper

    @trace
    def fibonacci(n):
    if n in (0,1):
    return n
    return (fibonacci(n-1) + fibonacci(n-2))
  • 上述存在几个问题
    1. 干扰那些需要利用 introspection 机制来运作的工具,例如调试器
    2. help()函数打印出来不是我们想看的文档
    3. 对象序列化器也无法正常运作
  • 可以使用 functools 内置模块之中的 wraps 辅助函数,wraps本身就是一个修饰器
    1
    2
    3
    4
    5
    6
    7
    8
    9
    from functools import wraps

    def trace(func):
    @wraps(func)
    def wrapp......

    @trace
    def fibo.....

推导与生成

27. 让列表推导取代map与filter

  • Python经常需要处理list、dict、set等数据结构,并且要以这种处理逻辑为基础来构建程序,叫做推导。运用到函数上产生了生成器(generator)
1
2
3
4
5
6
7
8
9
10
a = [1,2,3]
squ = []

for x in a:
squ.append(x**2)
print(squ)


squ = [x**2 for x in a]
print(squ)
  • map(square, numbers) 会返回一个迭代器,其中包含了列表 numbers 中每个元素经过 square 函数处理后的结果。
    1
    2
    3
    4
    5
    6
    def square(x):  
    return x * x

    numbers = [1, 2, 3, 4, 5]
    squared = map(square, numbers)
    print(list(squared)) # 输出:[1, 4, 9, 16, 25]
  • 字典和集合也可以通过推导生成

28. 控制推导逻辑的子表达式不要超过两个

  • 列表推导还支持多层循环,例如把二维列表转换成一维列表,每层循环还可以带有多个条件
    1
    2
    matrix = [[1,2,3],[4,5,6],[7,8,9]]
    flat = [x for row in matrix for x in row]
  • 控制推导逻辑的子表达式不要超过两个(例如两个if条件),如果实现的逻辑比这还复杂,那应该采用普通的 if 和 for 实现,并且可以使用辅助函数。

29. 用赋值表达式消除推导中的重复代码

  • 编写推导式与生成器表达式时,可以在描述条件的那一部分通过赋值表达式定义变量,并在其他部分复用该变量,可使程序简单易读
  • 对于推导式与生成器表达式来说,虽然赋值表达式也可以出现在描述条件的那一部分之外,但最好别这么写

30. 不要让函数直接返回列表,应该让它逐个生成列表里的值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def index_words(text):
result = []
if text:
result.append(0)
for index, letter in enumerate(text):
if letter == ' ':
result.append(index+1)
return result


def index_words_iter(text):
if text:
yield 0
for index, letter in enumerate(text):
if letter == ' ':
yield index + 1
  • 这种函数改用生成器来实现会更好,调用生成器不会让其中的代码立刻执行,会返回一个迭代器给 Python 的内置函数 next 函数
  • 如果不用生成器,就必须把所有的结果保存到列表中,然后才能返回列表。如果数据特别多,程序可能会因为耗尽内存而崩溃
  • 定义生成器函数的时候,有一点需要注意,调用者无法重复使用函数所返回的迭代器,因为这些迭代器是有状态的

31. 谨慎地迭代函数所收到的参数

  • 正常迭代器在已经把数据耗完的迭代器上面继续迭代,程序不报错
  • Python 的 for 循环及相关表达式,正是按照迭代器协议来遍历容器内容的。Python执行for x in foo 这样的语句时,实际会调用iter(foo),也就是把foo传给内置的iter函数。这个函数会触发名为foo.__iter__的特殊方法,该方法必须返回迭代器对象(这个对象本身要实现_next_特殊方法),最后 Python 会用迭代器对象反复调用内置的 next 函数,直到数据耗尽为止。如下
    1
    2
    3
    4
    5
    6
    7
    8
    class ReadVisits:
    def __init__(self, data_path):
    self.data_path = data_path

    def __iter__(self):
    with open(self.data_path) as f:
    for line in f:
    yield int(line)
  • 可以把值传给iter函数,检测它返回的是不是那个值本身。如果是,就说明这是个普通的迭代器,而不是一个可以迭代的容器。另外,也可以用内置的 isinstance 函数判断该值是不是 collection.abc.Iterator 类的实例

32.考虑用生成器表达式改写数据量较大的列表推导

  • 列表推导可以根据输入序列的么哦个元素创建一个包含派生元素的新列表,但是如果输入的数据量很大,那么程序可能内存耗尽而崩溃
  • 处理大规模数据,可以使用生成式表达式,写法与列表推导式语法类似,但它是写在圆括号里面,而不是方括号
  • 生成器表达式还可以组合迭代,可以用上一个生成器表达式作为输入,编写新的生成器表达式
    1
    2
    3
    4
    value = [len(x) for x in open('my_file.txt')]

    value = (len(x) for x in open('my_file.txt'))
    roots = ((x,x**0.5) for x in value)
  • 需要注意的是,生成器表达式返回的迭代器是有状态的,跑完一轮就不可以再用了

33. 通过yield from把多个生成器连起来用

  • 反复使用for结构来操纵生成器,而且for结构都使用相同的yield表达式,这样看上去很啰嗦。可以改用yield from
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    def animate():
    for delta in move(4,5.0):
    yield delta
    for delta in pause(3):
    yield delta
    for delta in move(2,3.0):
    yield delta

    def animate_compose():
    yield from move(4,5.0)
    yield from pause(3)
    yield from move(2,3.0)

34. 不要用 send 给生成器注入数据

  • yield 表达式让我们能够轻松地写出生成器函数,但问题是,这种通道是单向的,也就是说,无法让生成器一端接受数据流,另一端给出计算结果。
  • Python 生成器支持 send 方法,这可以让生成器变成双向通道。send可以把参数发给生成器,让它成为上一条yield表达式的求值结果,并将生成器推进到下一条yield表达式。
  • iter() 函数在Python中的作用是返回一个迭代器对象。这个函数通常用于将可迭代对象(如列表、元组、集合、字典等)转换为一个迭代器,这样就可以使用next()函数来逐个访问其中的元素。
  • send() 函数作用在于向生成器发送数据,这个数据会成为生成器中上一次暂停的 yield 表达式的返回值,然后生成器继续运行直到下一次 yield 表达式。
  • 使用 send() 函数时需要注意,第一次调用生成器时通常需要使用 next() 函数,而不是 send(),因为生成器尚未启动,暂无 yield 表达式供输入数据。
  • 把 send 方法与 yield from 表达式搭配起来使用,可能会导致奇怪的结果
  • 通过迭代器向组合起来的生成器输入数据,要比采用send方法哪种方案好,尽量避免使用 send 方法

35. 不要通过 throw 变换生成器的状态

  • throw 方法可以把异常发送到生成器刚执行的那条yield表达式那里,让这个异常在生成器下次推进时重新抛出
  • 通过 throw 方法注入异常,会让代码变得难懂
  • 应该通过 iter 方法实现生成器,并提供一个方法,让调用者通过方法来触发特殊状态变换逻辑

36. 考虑用 itertools 拼装迭代器与生成器

链接多个迭代器
  • chain 多个迭代器从头到尾连成一个迭代器
    1
    2
    3
    4
    it = itertools.chain([1,2,3], [4,5,6])
    print(list(it))

    [1,2,3,4,5,6]
  • repeat 重复迭代器,第二个参数指定输出几次
    1
    2
    3
    4
    it = itertools.repeat('hello',3)
    print(list(it))

    ['hello','hello','hello']
  • cycle 制作一个循环输出的各项元素
    1
    2
    3
    it = itertools.cycle([1,2])

    [1,2,1,2,1,2,1,2...]
  • tee 可以让一个迭代器分裂成多个平行的迭代器
    1
    2
    3
    4
    5
    6
    7
    8
    it1,it2,it3 = itertools.tee(['first', 'second'], 3)
    print(it1)
    print(it2)
    print(it3)

    ['first', 'second']
    ['first', 'second']
    ['first', 'second']
  • zip_longest 与zip函数类似,区别在于如果长度不同,它会用fillvalue参数的值填补提前耗尽的空缺
    1
    2
    3
    4
    5
    6
    keys = ['one','two','three']
    values = [1,2]

    it = litertools.zip_longest(keys, values, fillvalue='nope')

    [('one',1),('two',2),('three','nope')]
过滤源迭代器中的元素
  • islice 按照下标切割源迭代器
    1
    2
    3
    4
    5
    6
    values = [1,2,3,4,5,6,7,8,9,10]
    first = itertools.islice(values, 5)
    middle = itertools.islice(values,2,8,2)

    [1,2,3,4,5]
    [3,5,7]
  • takewhile 会一直从源迭代器里获取元素,直到元素让测试函数返回False
    1
    2
    3
    4
    5
    6
    values = [1,2,3,4,5,6,7,8,9,10]

    less_than_seven = lamda x: x<7
    it = itertools.takewhile(less_than_seven, values)

    [1,2,3,4,5,6]
  • dropwhile 相反
  • filterfalse 和 filter 相反,逐个返回输出源迭代器里使得测试桉树返回 False 的那些元素
    1
    2
    3
    4
    5
    6
    values = [1,2,3,4,5,6,7,8,9,10]
    evens = lamda x:x%2==0

    filter_result = itertools.filterfalse(evens, values)

    [1,3,5,7,9]
用源迭代器中的元素合成新元素
  • accumulate 从源迭代器里取出一个元素,并把已经累计的结果与这个元素一起传给表示累加逻辑的函数
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    values = [1,2,3,4,5,6,7,8,9,10]
    sum_reduce = itertools.accumulate(values)
    print('Sum: ', list(sum_reduce))

    def sum_modulo_20(first, second):
    output = first + second
    return output % 20

    modulo = itertools.accumulate(values, sum_modulo_20)

    [1,3,6,10,15,21,28,36,45,55]
    [1,3,6,10,15,1,8,16,5,15]
  • product 会产生笛卡尔积
    1
    2
    3
    4
    5
    single = itertools.product([1,2],repeat = 2)
    muliple = itertools.product([1,2],['a','b'])

    [(1,1),(1,2),(2,1),(2,2)]
    [(1,'a'),(1,'b'),(2,'a'),(2,'b')]
  • permutations 会考虑源迭代器所能给出的全部元素,并逐个输出由其中N个元素形成的每种有序排列方式,元素相同但是顺序不同算两种
  • combinations 会考虑源迭代器所能给出的全部元素,并逐个输出由其中N个元素形成的每种有序排列方式,元素相同但是顺序不同算一种
  • combinations_with_replacement 会考虑源迭代器所能给出的全部元素,并逐个输出由其中N个元素形成的每种有序排列方式,允许一个元素出现多次

37. 用组合起来的类来实现多层结构,不要用嵌套的内置类型

  • 不要在字典里嵌套字典、长元祖,以及用其他内置类型构造的复杂结构
  • 如果发现单个类,内部状态代码很复杂,那么就应该考虑改用多个类实现

38. 让简单的接口接受函数,而不是类的实例

  • python 许多内置的API,允许传入某个函数来定制它的行为,这种函数叫做挂钩(hook),API 在执行过程中,会回调(call back)挂钩函数
  • 其他编程语言中,挂钩可能会用抽象类来定义,但在 Python 中,许多挂钩是无状态函数,带有明确的参数与返回值
  • 通过挂钩函数可以很容易的构建出便于测试的 API,这种 API 可以把挂钩所实现的附加效果与数据所应具备的确定行为本身分开
  • Python 的函数与方法都是头等对象,这意味着它们可以像其他类型那样,用在表达式里
  • 某个类如果定义了 call 特殊方法,那么它的实例就可以像普通的 Python 函数那样调用
  • 如果想用函数来维护状态,那么可以考虑定义一个带有 call 方法的新类,而不要用有状态的闭包实现

39. 通过 @classmethod 多态来构造同一体系中的各类对象

  • Python 中不仅对象支撑多态,类也支持多态
  • 可以按照超类的形式统一地构造这些对象,并使其根据所属的子类分别去出发相关的特殊构造函数(工厂模式)
  • 但是 Python 里面不能这样做,因为 Python 的类只能有一个构造方法(init),可以通过类方法多态
  • 如果想在超类中用通用的代码构造子类实例,那么可以考虑定义@classmethod方法,并在里面用 cls(…) 的形式构造具体的子类对象
  • 通过类方法多态机制,我们能够以通用的形式构造并拼接具体的子类对象

40. 通过 super 初始化超类

  • 直接调用 init 方法所产生的一个问题在于,超类的构造逻辑不一定会按照它们在子类的 class 语句中的声明顺序
  • 第二个问题在于无法正确处理菱形继承,也就是子类通过两条不同路径继承了同一个超类
  • python 有标准的方法解析顺序(MRO)规则,可以用来判定超类之间的初始化顺序,并解决菱形继承问题
  • 可以通过 Python 内置的 super 函数正确触发超类的 init 逻辑

41. 考虑用 mix-in 类来表示可组合的功能

  • Mix-in类通常满足以下特定要求:
  1. 单一职责:Mix-in类通常只包含特定的功能,而不需要与整个系统或领域相关的广泛功能。这使得它们更容易被重用并且降低了引入错误的风险。

  2. 不依赖于其他类:Mix-in类通常不会依赖于不在其同一层次上的其他类的具体实现。

  3. 通过组合提供额外功能:Mix-in类通常通过提供额外的方法、属性或者其他功能性的方式,来通过组合形式提供额外的功能。

  4. 协议无关性:Mix-in类通常能够与许多不同的类协同工作,而不需要与特定的类进行耦合。这使得它们具有很强的通用性。

当一个类满足了上述要求,它就可以被称为Mix-in类。

  • 超类最高是能写成不带实体属性与 init 方法的 mix-in 类,避免多重继承所引发的问题
  • 把每个 mix-in 所提供的简单功能组合起来,可以实现比较复杂的功能
  • 在Python中,一个变量前面加上**代表在函数调用时将一个字典作为关键字参数进行传递,并且会进行解包操作。

42. 优先考虑用 public 属性表示应受保护的数据,不要用 private 属性表示

  • 如果属性名以两个下划线开头,就是 private 字段。属性所在的类可以通过实例方法访问该属性。
  • 子类不能访问超类的 private 字段
  • 而实现防止其他类访问 private 属性的功能,其实是仅仅通过变换属性名称而实现。例如 __private_field 是在 MyParentObjet 里的 init 里面定义的,所以变换之后的真实名称是 _MyParentObjet_private_field。子类无法访问。
  • 了解名称变换规则后,我们就可以从任何一个类里面访问 private 属性。无论子类还是外部类例如 baz. _MyParentObjet_private_fiel
  • 为什么 Python 都不从语法上严格禁止其他类访问 private 属性呢?用一句常见的格言:我们都是成年人了
  • python 习惯单划线设计 protected 字段,并用文档加以解释,而不要通过 private 属性限制访问
  • 只有在子类不受控制且名称有可能与超类冲突,才可以考虑给超类设计 private 属性

43. 自定义的容器类型应该从 collections.abc 继承

  • 为了让 BinaryNode 可以像序列那样使用,可以定义 getitem 方法,这个方法会按照深度优先的方式遍历 BinaryNode 对象所表示的二叉树

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    class BinaryNode:
    def __init__(self, value, left=None, right=None):
    self.value = value
    self.left = left
    self.right = right

    class IndexableNode(BinaryNode):
    def _traverse(self):
    if self.left is not None:
    yield from self.left._traverse()
    yield self
    if self.right is not None:
    yield from self.right._traverse()

    def __getitem__(self, index):
    for i, item in enumerate(self._traverse()):
    if i == index:
    return item.value
    raise IndexError(f'index {index}' our of range)
  • 直接定义 len() 方法和定义 __len__() 方法的区别在于语法和用途。如果你直接定义 len() 方法,你必须按照函数的方式来定义和调用它。这意味着你需要在类的定义中显式地添加一个名为len的方法,然后在实例上调用它来获取对象的长度或者元素个数。例如:

    1
    2
    3
    4
    5
    6
    7
    class MyList:  
    def len(self):
    # 计算并返回长度
    pass

    my_list = MyList()
    length = my_list.len()
  • 当你调用内置的len函数来获取对象的长度时,它实际上会尝试调用该对象的__len__方法。这意味着如果你定义了一个类,并在其中实现了__len__方法,那么你就可以使用len函数来获取该类的长度或者元素个数。如果你有一个名为MyList的类,并在其中实现了__len__方法,你可以这样使用:

1
2
my_list = MyList()  
length = len(my_list)
  • python 中内置的 collections.abc 模块定义了一系列抽象基类(abstract base class),模块要求子类必须实现某些特殊方法,保证不会把必备功能漏掉

元类与属性

44. 用纯属性与修饰器取代旧式的 setter 与 getter 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Resistor:
def __init__(self, ohms):
self.ohms = ohms
self.voltage = 0
self.current = 0


class VoltageResistance(Resistor):
def __init__(self, ohms):
super().__init__(ohms)
self._voltage = 0

@property
def voltage(self):
return self._voltage

@voltage.setter
def voltage(self, voltage):
self._voltage = voltage
self.current = self._voltage /self.ohms

  • 给新类定义接口时,应该先从简单的 public 属性写起,避免定义 setter 与 getter 方法
  • 如果在访问属性确定有必要的处理,通过 @property 来定义属性的设置属性方法
  • @property 方法必须执行得很快。复杂或缓慢的任务,尤其是涉及 I/O 或者引发副作用的那些任务,就使用普通方法

45.考虑用 @property 实现新的属性访问逻辑,不要急着重构原有的代码

  • @property 可以说是一种重要的缓冲机制,使开发者能够逐渐改善接口而不影响已经写好的代码
  • 如果您实现了 __repr__ 方法,当您在交互式环境中输入这个对象的名称时,会展示 __repr__ 方法返回的字符串。这样可以帮助您更清楚地了解对象的信息,以及在调试时有助于确认对象的状态。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    class Example:  
    def __init__(self, x, y):
    self.x = x
    self.y = y

    def __repr__(self):
    return f"Example({self.x}, {self.y})"

    obj = Example(3, 4)
    print(obj) # 这会调用 __repr__ 方法
  • @property 装饰器用于将类方法转换为只读属性。这意味着您可以通过属性的方式访问这个方法,而无需使用显式的函数调用。

46.用描述符来改写需要复用的 @property 方法

  • @property 方法加上 setter 组合,很麻烦。 getset 方法

47. 针对惰性属性使用 getattr__、__getattribute__、__setattr

  • __getattr__只会在属性缺失时触发、__getattribute__则在每次访问属性时都要触发
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    class DynamicAttributes:  
    def __init__(self):
    self.custom_attributes = {'color': 'blue', 'size': 'medium'}

    def __getattr__(self, name):
    if name in self.custom_attributes:
    return self.custom_attributes[name]
    else:
    return f"Attribute '{name}' not found."

    # 使用例子
    item = DynamicAttributes()
    print(item.color) # 输出:'blue'
    print(item.material) # 输出:'Attribute 'material' not found.'

    class LoggingAttributes:
    def __init__(self):
    self.logged_attributes = []

    def __getattribute__(self, name):
    if name.startswith('get_'):
    attribute_name = name[4:]
    value = super().__getattribute__(attribute_name) # 使用 super 避免无限递归
    self.logged_attributes.append(f"Accessed {attribute_name}")
    return value
    else:
    return super().__getattribute__(name)

    def get_custom_value(self):
    return "Some value"

    # 使用例子
    log_item = LoggingAttributes()
    print(log_item.get_custom_value()) # 输出:"Some value"
    print(log_item.logged_attributes) # 输出:['Accessed custom_value']

    class ProtectedAttributes:
    def __init__(self):
    self._protected_value = None

    def __setattr__(self, name, value):
    if name.startswith('_'):
    raise AttributeError("Attribute name cannot start with '_'")
    else:
    self.__dict__[name] = value

    # 使用例子
    protected_item = ProtectedAttributes()
    protected_item.public_value = "Allowed" # 正常设置
    protected_item._protected_value = "Not Allowed" # 抛出异常:AttributeError: Attribute name cannot start with '_'



48. 用__init_subclass__验证子类写得是否正确

  • __new__ 方法是 Python 中新式类 (new-style class) 的一个特殊方法。它在一个类实例被创建时被调用,负责创建实例并返回该实例。与之不同的是,__init__ 方法负责实例的初始化。
  • @classmethod 是 Python 中的一个装饰器,用于定义类方法。类方法是一种绑定到类而不是实例的方法。这意味着即使没有创建类的实例,也可以通过类本身来调用这个方法。
  • @classmethod@staticmethod 都是用来定义特定类型的方法,但它们有一些关键的区别:
    参数差异:
    @classmethod 的方法的第一个参数通常是 cls,代表类本身。通过这个参数,可以访问类的属性和其他类方法。
    @staticmethod 的方法不需要额外的参数来表示类或实例。它不需要 selfcls 参数,因此在方法内部无法访问类属性或实例属性。
    使用场景:
    通常情况下,使用 @classmethod 定义类方法,当需要在方法内部使用类的属性时,或者需要通过类调用方法而不是通过实例调用方法时。
    使用 @staticmethod 定义静态方法,当方法不需要与类或者实例有太多关联,仅仅是在类的命名空间中为组织目的而定义时。
    方法访问:
    通过类调用 @classmethod 方法时,方法可以访问并操作类属性。
    通过类调用 @staticmethod 方法时,方法无法直接访问类属性,因为它不会接收到对类的引用,也无法访问实例属性。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    class MyClass:  
    class_variable = 10

    @classmethod
    def class_method(cls):
    print(f"Class method accessing class_variable: {cls.class_variable}")

    @staticmethod
    def static_method():
    print("This is a static method")

    # 调用方法
    MyClass.class_method() # 可以访问并操作类属性
    MyClass.static_method() # 无法访问类属性
  • python 3.6 之后定义了 init_subclass 这个特殊的类方法实现相同的功能,这样就不用专门定义元类了
  • 在分层的或者涉及多重继承的类体系里面,一定别忘了在你写的这些类的 _init__subclass 内通过 super() 来调用超类的 _init_subclass 方法,以便按照正确的顺序触发各类的验证逻辑

49. 用 init_subclass 记录现有的子类

  • 类注册是个非常有用的模式,可以通过基类的元类把用户从这个基类派生出来的子类自动注册给系统
  • 优先考虑 __init_subclass__实现自动注册,更清晰,更便于初学者理解

50. 用 set_name 给类属性加注解

51. 优先考虑通过类修饰器来提供可组合的扩充功能,不要使用元类

  • 类修饰器是一种用于修改类或其方法行为的特殊类型的装饰器。它们允许你在不修改原始类或方法定义的情况下,动态地修改或扩展它们的行为。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    class CallCounter:  
    def __init__(self, f):
    self.f = f
    self.calls = 0

    def __call__(self, *args, **kwargs):
    self.calls += 1
    print(f"Method {self.f.__name__} called {self.calls} times")
    return self.f(*args, **kwargs)

    @CallCounter
    class MyClass:
    def __init__(self):
    pass

    def some_method(self):
    print("Doing something")

    # 使用类修饰器
    obj = MyClass()
    obj.some_method()
    obj.some_method()

并发与并行

52. 用 subprocess 管理子进程

  • 并发指计算机似乎能在同一时刻做许多件不同的事情。例如,在只配有一个 CPU 核心的计算机上,操作系统切换程序,造成同时运行的假象
  • 并行与并发的区别在于,它强调计算机确实能够在同一时刻做许多不同的事情。例如多核 CPU
  • python 很容易写出各种风格的并发程序,并发量小的时候可以使用线程(thread),如果要运行大量的并发函数,那么可以使用协程(coroutine)。并行程序需要使用系统调用、子进程与 C 语言扩展来实现,但要写真正能够并行的 Python 代码,其实很困难。
  • Python 里面有许多方式都可以运行子进程(例如 os.popen 函数以及 os.exec* 系列的函数),其中最好的办法是通过内置的 subprocess 模块来管理
  • subprocess.run 函数是在Python 3.5中引入的,它更适合简单的情况。它以阻塞的方式运行外部命令,直到命令完成,然后返回一个 CompletedProcess 对象,其中包含有关进程的信息,例如退出码等。
  • subprocess.Popen 函数更加灵活,可以与 communicate 方法一起使用,这样可以在子进程执行期间与其进行交互,包括输入和输出。它返回一个 Popen 对象,你可以使用它来控制、监视或与启动的进程进行交互。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    import subprocess

    result = subprocess.run(
    ['echo', 'Hello'],
    capture_output=True,
    encoding='utf-8'
    )

    result.check_returncode()
    print(result.stdout)
  • process.communicate() 是一个用于与子进程进行交互的方法,通常与 subprocess.Popen 一起使用。这个方法会阻塞主进程,直到子进程执行结束。它允许主进程向子进程发送数据,并获取子进程的输出数据和错误数据。当调用 communicate() 时,如果子进程有输出,它将返回一个包含标准输出和标准错误输出的元组 (stdoutdata, stderrdata)。对于没有输出的流,将返回 None。调用 communicate() 后,子进程完成执行。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    import subprocess  

    # 启动一个命令
    process = subprocess.Popen(["ls", "-l"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    # 获取输出和错误
    output, error = process.communicate()

    # 打印输出和错误
    print(output)
    print(error)
  • proc.stdin.write()proc.stdin.flush() 是用于与子进程的标准输入流(stdin)进行交互的函数。

proc.stdin.write() 用于将数据写入子进程的标准输入流。它允许主进程向子进程发送数据。这个方法会将数据写入标准输入缓冲区,但并不会立即发送到子进程。

proc.stdin.flush() 用于刷新标准输入流的缓冲区,强制将缓冲区中的数据发送到子进程。这在需要确保数据被立即发送给子进程时很有用。

1
2
3
4
5
6
7
8
9
10
import subprocess  

# 启动一个命令
process = subprocess.Popen(["cat"], stdin=subprocess.PIPE)

# 写入数据到子进程的标准输入
process.stdin.write(b"Hello, subprocess\n")

# 刷新标准输入缓冲区,确保数据被发送到子进程
process.stdin.flush()
  • subporcess 可以运行子进程并管理输入流与输出流
  • 子进程和 python 解释器所在的进程并行,从而充分利用 CPU
  • communicate 方法可以制定 timeout 参数,让我们可以把死锁或已经卡住的子进程关掉

53. 可以用线程执行阻塞式 I/O,但不要用它做并行计算

  • Python 语言的标准实现是 Cpython,,分为两步来运行 python 程序。首先解析源代码文本,编程字节码(bytecode)。字节码是一种底层代码,把程序表示为 8 位的指令(从 Python3.6 开始,底层代码实际上已经变成 16 位了,所以应该叫做 wordcode)。然后 Cpython 采用基于栈的解释器来运作字节码。字节码解释器在执行 Python 程序的过程中,必须确保相关的状态不受干扰,Cpython会有一种全局解释器锁(GIL)机制。
  • Python 线程会受 GIL 约束,每次或许只能有一条线程向前推进。
  • 尽管 Python 支持多线程,但在处理 CPU 密集型任务时,多线程的效果并不明显,因为线程的执行仍然受到 GIL 的约束。然而,对于 I/O 密集型任务,多线程在 Python 中仍然具有优势,因为线程在等待 I/O 操作完成时可以释放 GIL,并允许其他线程执行。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    from threading import Thread
    import time

    class FactorizeThread(Thread):
    def __init__(self, number):
    super().__init__()
    self.number = number

    def run(self) -> None:
    self.factors = list(factorize(self.number))


    def factorize(number):
    for i in range(1, number+1):
    if number % i ==0:
    yield i

    numbers = [2139079, 1214759, 1516637, 1852285]
    start = time.time()

    threads = []
    for number in numbers:
    thread = FactorizeThread(number)
    thread.start()
    threads.append(thread)

    for thread in threads:
    thread.join()


    end = time.time()
    delta = end - start
    print(f'Took {delta:.3f} seconds')
  • 多余 Python 线程可以并行执行多个系统调用,这样就可以让程序在执行阻赛式的 I/O 任务时,继续做其他运算。
  • 具体来说,对于一个线程列表(或类似的可迭代对象)中的每个线程,调用 join() 方法将会阻塞当前线程,直到被调用的线程执行结束。换句话说,当主线程执行这段代码时,它会等待所有在 threads 中的线程执行结束,然后再继续执行主线程后面的代码。这对于需要确保所有线程都执行完毕后再进行后续操作的情况非常有用。

54. 利用 Lock 防止多个线程争用同一份数据

  • 即使有 GIL 全局解释锁,依然需要互斥锁,多个线程同时访问同一个对象是很危险的。每条线程在操作这份数据时,都有可能遭到其他线程破坏。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    from threading import thread

    how_many = 10 ** 5
    counter = Counter()

    threads = []
    for i in range(5):
    thread = Thread(target=worker, args = (i, how_many, Counter))
    threads.append(thread)
    thread.start()

    for thread in threads:
    thread.join()
  • 上面这个程序结果会出错,是因为 python并不知道具体会什么时候暂停线程,万一这条线程正在执行的是一项本来不应该中断的原子操作,就会出错。
    1
    2
    3
    4
    5
    counter.count += 1
    # 其实执行时会变成下面的语句
    value = getattr(counter, 'count')
    result = value + 1
    setattr(counter, 'count', result)
  • python把这条线程切换走,再切换回来,value 是没有更新过的 value 所以是错的。要解决这个问题只需要加一个互斥锁 Lock
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    from threading import Lock

    class LockingCounter:
    def __init__(self):
    self.lock = Lock()
    self.count = 0

    def increment(self, offset):
    with self.lock:
    self.count += offset

55. 用 Queue 来协调各线程之间的工作进度

  • 使用常规方法实现生产者消费者模型,问题在于

    1. 为了判断全部产品是否加工完毕,必须反复查询最后那个队列,以确认里面的元素个数是否已经变得与刚开始的原料总数相同
    2. 目前这种方案会使 run 方法陷入无限循环,,无法明确知道线程何时应该退出
    3. 下游环节处理过慢,程序可能崩溃。
  • 可以使用 Queue 中的 queue 解决上述问题,因为它的 get 方法会一直阻塞。可以限定 Queue 最多多少个元素,通过 put 方法添加新元素,直到队列里有空位为止。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    from queue import Queue
    from threading import Thread

    my_queue = Queue()

    def consumer():
    print("Consumer waiting")
    my_queue.get()
    print("done")


    print("putting")
    my_queue.put(1)
    print("putting done")

    thread = Thread(target=consumer())
    thread.start()
    thread.join()
1
2
3
4
5
6
7
8
9
10
11
12
my_queue = Queue(1)

def consumer():
time.sleep(0.1)
my_queue.get()
print('Consumer got 1')
my_queue.get()
print('Consumer got 2')
print('Consumer done')

thread = Thread(target = consumer)
thread.start()
  • Queue 还可以通过 task_done 方法告诉程序它已经处理完其中一个元素处理完了。就不用反复查询队列中的数据了,只需要调用 join() 。即便队列中的元素已经全部取走,只要 task_done 方法执行次数不足,就会卡住,直到早前所有加入队列的元素都调用一次 task_done 方法为止。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    from queue import Queue
    from threading import Thread

    def download(item):
    print(1)

    def resize(item):
    print(2)

    def upload(item):
    print(3)

    class ClosableQueue(Queue):
    SENTINEL = object()

    def close(self):
    self.put(self.SENTINEL)

    def __iter__(self):
    while True:
    item = self.get()
    try:
    if item is self.SENTINEL:
    return
    yield item
    finally:
    self.task_done()

    class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
    super().__init__()
    self.func = func
    self.in_queue = in_queue
    self.out_queue = out_queue

    def run(self):
    for item in self.in_queue:
    result = self.func(item)
    self.out_queue.put(result)

    download_queue = ClosableQueue()
    resize_queue = ClosableQueue()
    upload_queue = ClosableQueue()
    done_queue = ClosableQueue()

    threads = [
    StoppableWorker(download, download_queue, resize_queue),
    StoppableWorker(resize, resize_queue, upload_queue),
    StoppableWorker(upload, upload_queue, done_queue),
    ]

    for thread in threads:
    thread.start()

    for _ in range(1000):
    download_queue.put(object())

    download_queue.close()
    download_queue.join()
    resize_queue.close()
    resize_queue.join()
    upload_queue.close()
    upload_queue.join()
    print(done_queue.qsize(), 'item finished')

    for thread in threads:
    thread.join()

56. 学会判断什么场合必须做并发

  • 程序范围变大,需求变复杂后,经常要用多条路径平行处理任务
  • 针对每个工作单元开辟一条执行路径,这种模式叫做扇出(fan-out)。等待这些并发的工作单元完工,才能执行下一个环节,叫做扇入(fan-in)。

57. 不要在每次 fan-out 时都新建一批 Thread 实例

  • 每次都手工创建一批线程,是有很多缺点的
    1. 必须专门采用工具,例如 LOCK 来管理协调这些线程
    2. 线程占用内存较多,每条线程大概 8 MB
    3. 系统频繁切换线程,会降低程序的运行效率
  • 线程本身不会把执行过程中遇到的异常抛给启动线程或者等待该线程完工的那个人,所以异常难以调试

58. 学会正确地重构代码,以便用Queue 做并发

  • 就是用内置的 queue 模块里的 Queue 类实现多线程管道。把队列(Queue)与一定数量的工作线程搭配起来,可以高效地实现fan-out(分派)与 fan-in(归集)
  • 改用队列方案来处理 I/O,我们重构了许多代码,如果管道要分成好几个环节,那么修改的地方会很多
  • 可以考虑直接用 python 内置的某些功能与模块打造更好的方案

59. 如果必须用线程做并发,那就考虑通过 ThreadPoolExecutor

  • 利用 ThreadPoolExecutor ,我们只需要稍微调整一下代码,就能够并行地执行简单的 I/O 操作
  • 虽然 ThreadPoolExecutor 不像直接启动线程的方案那样,需要消耗大量内存,但它的 I/O 并行能力有限。它最大线程数需要提前通过 max_workers 参数指定
  • 内置模块中的 concurrent.futures 提供了 ThreadPoolExector 类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    from concurrent.futures import ThreadPoolExecutor  

    def some_function(param):
    # 定义一些操作
    return result

    # 创建一个包含5个工作线程的线程池
    with ThreadPoolExecutor(max_workers=5) as executor:
    # 提交任务到线程池
    future = executor.submit(some_function, some_parameter)
    # 获取结果
    result = future.result()

60. 用协程实现高并发的 I/O

  • 在并发方面要求比较高的 I/O 需求,可以用 Python 的协程 (coroutine) 解决
  • 协程可以制造出一种效果,让我们觉得 Python 程序好像真的可以同时执行大量任务。这种效果需要使用 async 和 await 关键字实现,它的基本原理与生成器类似
  • 启动协程也是有代价的,必须做函数调用。激活后,协程只占不到 1KB 内存,所以只要内存足够,协程多也没关系
  • 协程和线程之间的区别是,它不会把这个函数从头到尾执行完,而是每遇到一个 await 表达式就暂停,下次继续执行的时候先等待 await 所针对的那项 awaitable 操作有了结果,然后再推进到下一个 await 表达式
  • 协程不会像线程那样占用很多内存,启动切换开销也很小,而且不需要用复杂的代码来实现加锁或同步。这种强大的机制是通过事件循环(event loop)打造的
  • 协程优点,可以把那些与外部环境交互的代码(例如 I/O 调用)与那些实现自身需求的代码(例如事件循环)解耦。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    import asyncio
    import aiofiles

    async def read_file(file_name):
    print(f"Reading file: {file_name}")
    async with aiofiles.open(file_name, 'r') as file:
    content = await file.read()
    print(f"File {file_name} contents: {content}")


    async def main():
    files = ["merge.py", "sortList.py", "quiver1.py"]
    await asyncio.gather(*(read_file(file) for file in files))


    if __name__ == "__main__":
    asyncio.run(main())

61.学会用 asyncio 改写那些通过线程实现的 I/O

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
import random
import contextlib
import math
import socket
from threading import Thread
import asyncio
class EOFError(Exception):
pass

class connectionBase:
def __init__(self, connection):
self.connection = connection
self.file = connection.makefile('rb')

def send(self, command):
line = command + '\n'
data = line.encode()
self.connection.send(data)

def receive(self):
line = self.file.readline()
if not line:
raise EOFError('Connection closed')
return line[:-1].decode()

class AsyncConnectionBase:
def __init__(self, reader, writer):
self.reader = reader
self.writer = writer

async def send(self, command):
line = command + '\n'
data = line.encode()
self.writer.write(data)
await self.writer.drain()

async def receive(self):
line = await self.reader.readline()
if not line:
raise EOFError('Connection closed')
return line[:-1].decode()

WARMER = 'Warmer'
COLDER = 'Colder'
UNSURE = 'Unsure'
CORRECT = 'Correct'

class UnknownCommandError(Exception):
pass

class Session(connectionBase):
def __init__(self, *args):
super().__init__(*args)
self._clear_state(None,None)

def _clear_state(self, lower, upper):
self.lower = lower
self.upper = upper
self.secret = None
self.guesses = []

def loop(self):
while command := self.receive():
parts = command.split(' ')
if parts[0] == 'PARAMS':
self.set_params(parts)
elif parts[0] == 'NUMBER':
self.send_number()
elif parts[0] == 'REPORT':
self.receive_report(parts)
else:
raise UnknownCommandError(command)

def set_params(self, parts):
assert len(parts) == 3
lower = int(parts[1])
upper = int(parts[2])
self._clear_state((lower, upper))

def next_guess(self):
if self.secret is not None:
return self.secret

while True:
guess = random.randint(self.lower, self.upper)
if guess not in self.guesses:
return guess

def send_number(self):
guess = self.next_guess()
self.guesses.append(guess)
self.send(format(guess))

def receive_report(self, parts):
assert len(parts) == 2
decision = parts[1]

last = self.guesses[-1]
if decision == CORRECT:
self.secret = last

print(f'Server: {last} is {decision}')

class AsyncSession(AsyncConnectionBase):
def __init__(self, *args):
super().__init__(*args)
self._clear_state(None,None)

def _clear_state(self, lower, upper):
self.lower = lower
self.upper = upper
self.secret = None
self.guesses = []

async def loop(self):
while command := await self.receive():
parts = command.split(' ')
if parts[0] == 'PARAMS':
self.set_params(parts)
elif parts[0] == 'NUMBER':
self.send_number()
elif parts[0] == 'REPORT':
self.receive_report(parts)
else:
raise UnknownCommandError(command)

def set_params(self, parts):
assert len(parts) == 3
lower = int(parts[1])
upper = int(parts[2])
self._clear_state((lower, upper))

def next_guess(self):
if self.secret is not None:
return self.secret

while True:
guess = random.randint(self.lower, self.upper)
if guess not in self.guesses:
return guess

async def send_number(self):
guess = self.next_guess()
self.guesses.append(guess)
await self.send(format(guess))

def receive_report(self, parts):
assert len(parts) == 2
decision = parts[1]

last = self.guesses[-1]
if decision == CORRECT:
self.secret = last

print(f'Server: {last} is {decision}')

class Client(connectionBase):
def __init__(self, *args):
super().__init__(*args)
self._clear_state()

def _clear_state(self):
self.secret = None
self.last_distance = None

@contextlib.contextmanager
def session(self, lower, upper, secret):
print(f'Guess a number between {lower} and {upper}!'
f'shhhhh, it\'s {secret}')
self.secret = secret
self.send(f'PARAMS {lower} {upper}')
try:
yield
finally:
self._clear_state()
self.send('PARAMS 0 -1')

def request_number(self, count):
for _ in range(count):
self.send('NUMBER')
data = self.receive()
yield int(data)
if self.last_distance == 0:
return

def report_outcome(self, number):
new_distance = math.fabs(number - self.secret)
decision = UNSURE

if new_distance == 0:
decision = CORRECT
elif self.last_distance is None:
pass
elif new_distance < self.last_distance:
decision = WARMER
elif new_distance > self.last_distance:
decision = COLDER

self.last_distance = new_distance

self.send(f'REPORT {decision}')
return decision

class AsyncClient(AsyncConnectionBase):
def __init__(self, *args):
super().__init__(*args)
self._clear_state()

def _clear_state(self):
self.secret = None
self.last_distance = None

@contextlib.asynccontextmanager
async def session(self, lower, upper, secret):
print(f'Guess a number between {lower} and {upper}!'
f'shhhhh, it\'s {secret}')
self.secret = secret
await self.send(f'PARAMS {lower} {upper}')
try:
yield
finally:
self._clear_state()
await self.send('PARAMS 0 -1')

async def request_number(self, count):
for _ in range(count):
await self.send('NUMBER')
data = await self.receive()
yield int(data)
if self.last_distance == 0:
return

async def report_outcome(self, number):
new_distance = math.fabs(number - self.secret)
decision = UNSURE

if new_distance == 0:
decision = CORRECT
elif self.last_distance is None:
pass
elif new_distance < self.last_distance:
decision = WARMER
elif new_distance > self.last_distance:
decision = COLDER

self.last_distance = new_distance

await self.send(f'REPORT {decision}')
return decision


def handle_connection(connection):
with connection:
session = Session(connection)
try:
session.loop()
except EOFError:
pass

def run_server(address):
with socket.socket() as listener:
listener.bind(address)
listener.listen()
while True:
connection, _ = listener.accept()
thread = Thread(target=handle_connection,
args = (connection,),
daemon = True)
thread.start()

def run_client(address):
with socket.create_connection(address) as connection:
client = Client(connection)

with client.session(1,5,3):
results = [(x, client.report_outcome(x))
for x in client.request_number(5)]

with client.session(10, 15, 12):
for number in client.request_number(5):
outcome = client.report_outcome(number)
results.append((number, outcome))

return results


async def handle_async_connection(reader, writer):
session = AsyncSession(reader, writer)
try:
await session.loop()
except EOFError:
pass

async def run_async_server(address):
server = await asyncio.start_server(
handle_async_connection, *address
)
async with server:
await server.serve_forever()

async def run_async_client(address):
streams = await asyncio.open_connection(*address)
client = AsyncClient(*streams)

async with client.session(1,5,3):
results = [(x, await Client.report_outcome(x))
async for x in Client.request_number(5)]

async with client.session(10,15,12):
async for number in Client.request_number(5):
outcome = await Client.report_outcome(number)
results.append((number, outcome))

_,writer = streams
writer.close()
await writer.wait_closed()

return results


def main():
address = ('127.0.0.1', 1234)
server_thread = Thread(
target=run_server, args=(address,), daemon=True
)
server_thread.start()

results = run_client(address)
for number,outcome in results:
print(f'Client: {number} is {outcome}')


async def main_async():
address = ('127.0.0.1',70)

server = run_async_server(address)
asyncio.create_task(server)

results = await run_async_client(address)
for number, outcome in results:
print(f'Client: {number} is {outcome}')


asyncio.run(main_async())

62. 结合线程和协程,将代码顺利迁移到 asynio

  • 在Python的asyncio模块中,有几个常用的函数,用于协程的管理和执行。以下是一些常见的asyncio函数:

asyncio.create_task(coroutine, *, name=None):用于创建一个任务来运行指定的协程。

asyncio.gather(*coroutines, loop=None, return_exceptions=False):将多个协程聚合为一个协程,同时并行运行这些协程,并且等待它们全部完成。

asyncio.wait_for(fut, timeout, loop=None):等待一个Future,直到它完成或者发生超时,返回Future的结果或者抛出一个超时异常。

asyncio.sleep(delay, result=None, *, loop=None):暂停执行当前协程,让出CPU给其他协程执行,并在指定的时间后恢复执行当前协程。

asyncio.run(coro, *, debug=False):运行一个协程,通常用于启动整个asyncio应用程序。

这些函数是asyncio中常用的函数之一,在编写异步Python代码时经常会用到它们。

  • asyncio 模块的事件循环提供了一个返回 awaitable 对象的 run_in_executor 方法,它能够使协程把同步函数放在线程池执行器(ThreadPoolExector)中
  • run_until_complete 方法用来运行协程等待结束, run_coroutine_threadsafe 类似。只是后者跨线程,前者是在用一个线程里。

63. 让 asynio 的事件循环保持畅通,以便进一步提升程序的响应能力

  • 把系统调用(包括阻塞式的 I/O 以及启动线程等操作)放在协程里面执行,会降低程序的响应能力,增加延迟感
  • 调用 asynio.run 时,可以把 debug 参数设为 True,这样能够知道哪些协程降低了事件循环的反应速度

64. 考虑用 concurrent.futures 实现真正的并行计算

  • 并行计算一种常见方案,是把那些对性能要求比较高的代码用 C 语言重写,C 语言更接近底层硬件,运行速度要比 Python 快。C 扩展还可以启动原生线程,这种线程不受 Python 解释器制约。可以借助 SWIG、CLIF。
  • Python 中内置的 multiprocessing 模块提供了多进程机制,这种机制很容易通过内置的 concurrent.futures 模块来使用。这种方案可以启动许多子进程(child process),这些进程独立于主解释器,有各自的解释器与相应的全局解释器锁。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    import time
    from concurrent.futures import ProcessPoolExecutor

    def gcd(pair):
    a, b = pair
    low = min(a,b)
    for i in range(low, 0 ,-1):
    if a % i == 0 and b % i == 0:
    return i
    assert False, 'Not reachable'

    NUMBERS = [
    (1963309, 2265973), (2030677, 3814172),
    (1551645, 2229620), (2039045, 2020802),
    (1823712, 1924928), (2293129, 1020491),
    (1281238, 2273782), (3823812, 4237281),
    (3812741, 4729139), (1292391, 2123811),
    ]

    def main():
    start = time.time()
    pool = ProcessPoolExecutor(max_workers=2)
    results = list(pool.map(gcd, NUMBERS))
    end = time.time()
    delta = end - start
    print(f'Took {delta:.3f} seconds')

    if __name__ == '__main__':
    main()
  • 然而这样做的开销很大,因为它必须在上级进程与子进程之间做全套的序列化与反序列化处理。
  • 这个方案对那种孤立的而且数据利用度比较高的任务来说,比较合适。指每一部分任务都不需要跟程序里的其他部分共用状态信息。
  • multiprocessing 所提供的一些其他高级功能,共享内存、跨进程的锁、队列以及代理等。

稳定与性能

Python 有很多内置的特性与模块,可以帮我们加固程序代码,让它应付各种各样的状况

65. 合理利用 try/except/else/finally 结构中的每个代码块

  • 完整的 try/except/else/finally 结构。例如要把待处理的数据从文件中读取出来,然后加以处理,最后把结果写回文件之中。在实现这个功能时,可以把读取文件并处理数据的那段代码放进 try 里面,并用 except 块来捕获 try 块可能抛出的某些异常。如果 try 块正常结束,,else 块中把处理结果写回原来的文件,最后程序无论进入了 except 还是 else ,它都会在即将返回之前,先执行 finally 块以清理文件句柄。

66. 考虑用 contextlib 和 with 语句来改写可复用的 try/finally 代码

  • with 语句强调某段代码需要在特殊情景之中执行,下面两段代码等效
    1
    2
    3
    4
    5
    6
    from threading import Lock

    lock = Lock()
    with lock:
    ...

1
2
3
4
5
6
7
from threading import Lock

lock.acquire()
try:
...
finally:
lock.release()
  • 如果想让其他对象和函数也可以用在,也能用在 with 之中,可以用 contextlib 实现,提供了 contextmanager 修饰器。这比普通方法简单,普通方法需要定义新类并实现名为 enterexit 特殊方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @contextmanager
    def debug_logging(level):
    logger = logging.getLogger()
    old_level = logger.getEffectiveLevel()
    logger.setLevel(level)

    try:
    yield
    finally:
    logger.setLevel(old_level)
    这段代码是一个上下文管理器(context manager),它使用了 Python 中的 contextmanager 装饰器。上下文管理器通常用于管理资源,比如在进入和离开某个特定环境时执行一些操作。在这种情况下,这个上下文管理器用于临时修改 logging 模块的日志级别,以便在特定代码块中启用调试日志记录。

让我们逐行解释这段代码:

@contextmanager: 这是一个装饰器,用于创建上下文管理器。它允许一个函数生成器(generator function)来定义一个支持 with 语句的上下文管理器。

def debug_logging(level): 这是一个定义了上下文管理器的函数。它接受一个参数 level,表示要设置的日志级别。

logger = logging.getLogger(): 这一行获取了根日志记录器。

old_level = logger.getEffectiveLevel(): 这一行保存了当前日志级别,以便在退出代码块时恢复原始的日志级别。

logger.setLevel(level): 这一行将日志级别设置为传入上下文管理器的参数 level

yield: 这个关键字标志着上下文管理器的“中点”。在这里,它将控制权暂时交给调用者,允许在进入和离开上下文时执行特定的操作。

finally: 这是一个关键字,用于定义无论如何都会执行的代码块。在这里,它确保在离开上下文管理器时使日志级别恢复到之前保存的旧级别。

logger.setLevel(old_level): 这行将日志级别重设为之前保存的旧级别。

  • 通过 yield 产生的值,可以由 with 语句之中位于 as 右侧的那个变量所接收。
  • with 结构的另一个好处,也就是允许我们创造一个特殊的情境,并于这个情境交互,保持情境与在情境中所执行操作之间是去耦合的、状态独立的。

67.用 datetime 模块处理本地时间,不要用 time 模块

  • 协调世界时(UTC)是标准的时间表示方法,它不依赖特定时区
  • time 模块本质上仍然要依赖具体的平台而运作,它的行为取决于底层的 C 函数与宿主操作系统之间的协作方式
  • 把 python 内置的 datetime 模块与开发者社群提供的 pytz 模块结合,可以在不同时区之间可靠地转换
  • 在操纵时间数据的过程中,总是应该使用 UTC 时间,只有到最后一步,才需要把它转回当地时间

68. 用 copyreg 实现可靠的 pickle 操作

  • pickle 模块所使用的这种序列化格式本身就没有考虑过安全问题。这种格式会把原有的 python 对象记录下来,让系统可以在稍后予以重建。如果要在不信任的两个人或两个程序之间传递数据,应该使用 json 格式。
  • 如果对象所在的这个类发生了变化(例如增加或删除了某些属性),那么程序在还原旧版数据的时候,可能会出现错误。
  • 把内置的 copyreg 模块与 pickle 模块搭配起来使用,可以让新版的程序兼容旧版的序列化数据。

69. 在需要准确计算的场合,用 decimal 表示相应的数值

  • 在精度要求较高且需要控制舍入方式的场合(例如在计算费用的时候),可以考虑使用 Decimal 类
  • 用小数构造 Decimal 时,如果想保证取值正确,那么一定要把这个数放在 str 字符串里面传递

70. 先分析性能,然后再优化

  • 优化 python 程序之前,一定要先分析它的性能,因为导致程序速度缓慢的真正原因未必与我们想的一样
  • 应该优先考虑使用 cProfile 模块来分析性能,而不要用 profile 模块,因为前者得到的分析结果更加准确
  • 把需要接受性能测试的主函数传给 Profile 对象的 runcall 方法,就可以专门分析出这个体系下面所有函数的调用情况
  • 可以通过 States 对象筛选出我们关心的那些分析结果,从而更为专注地思考如何优化程序

71. 优先考虑用 deque 实现生产者-消费者队列

  • list 类型可以用来实现 FIFO 队列,生产者可以通过 append 添加元素。但是这种方案有一个问题,就是消费者字在用 pop(0) 从队列获取元素时,所花时间会随着队列长度,呈平方式增长
  • 跟 list 不同,内置的 collections 模块之中的 deque 类,无论 append 还是 popleft 所花时间只跟队列长度呈线性关系,而非平方关系,适合 FIFT 队列。
  • FIFO 和 FIFT 队列都是与数据存储和检索有关的概念,通常用于描述不同的队列工作方式。

FIFO 队列表示“先进先出”队列,这意味着最先进入队列的数据项会最先被移出队列。这种队列模型类似于排队买东西,最早排队的人最先被服务。

FIFT 队列则代表“先进先已处理”队列,其中“T”代表着“treated”。这种队列模型指示一旦数据项被处理(即被取出队列),它就会被标记为已处理,不会再次被处理。

FIFO 和 FIFT 队列在不同的情况下可能被用于数据处理和存储中,这取决于特定的需求和系统设计。

72. 考虑用 bisect 搜索已排序的序列

  • 用 index 方法在已经排好顺序的列表之中查找某个值,花费的时间与列表长度成正比,通过 for 循环单纯地不好
  • 应该使用 bisect 模块里面有一个 bisect_left 函数,只需要花费对数级别的his见就可以在有序列表中搜寻某个值,这要比其他方法快

73. 学会使用 heapq 制作优先级队列

  • Python 内置的 heapq 模块可以解决这个问题,因为它能够高效地实现出优先级队列。
  • heapq 模块规定,添加到优先级队列里面的元素必须是可比较的,并且具有自然顺序。
  • Python 内置的 functools 模块里有个名为 total_ordering 的类修饰器,只需要用它来修饰 Book,并把描述关系的 lt 特殊方法定义出来,就可以消除刚才的错误。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    import functools

    @functools.total_ordering
    class Book:
    def __init__(self, title, due_date):
    self.title = title
    self.due_date = due_date

    def __lt__(self, other):
    return self.due_date < other.due_date
  • 优先队列让我们能够按照重要程度来处理元素,而不是必须按照先进先出的顺序处理
  • 另外还有一种能保证线程安全的方案,queue.PriorityQueue 类方法

74. 考虑用 memoryview 与 bytearray 来实现无须拷贝的 bytes 操作

  • Python 内置的 memoryview() 类型来改进,这个类型让程序能够利用 CPython 的缓冲协议(buffer protocol) 高效操纵字节数据。协议属于底层 C 的 API,允许 Python 运行时系统与 C 扩展访问底层的数据缓冲,而 bytes 等实例正是由这种数据缓冲对象所支持。
  • bytearray 相当于可修改的 bytes ,它允许我们修改任意位置上面的内容

测试与调试

python 没有编译器的静态类型检查机制,所以 python 解释器无法确保程序一定能够正常运行。

75. 通过 repr 字符串输出调试信息

  • 把内置类型的值传给 repr,会得到一个能够表示该值的可打印字符串,将这个 repr 字符串传给内置的 eval 函数能够得到原值
  • 给类定义 repr 特殊方法,可以让 print 函数把该类实例的可打印表现形式展现出来。

76. 在 TestCase 子类里验证相关的行为

  • python 内置的 unittest 模块里有个 TestCase 类,我们可以定义它的子类,并在其中编写多个 test 方法,以便分别验证想要测试的每一种行为。TestCase 子类的这些 test 方法名称都必须以 test 这个词开头。
  • Testcase 类还提供了许多辅助方法,例如 assertEqual 辅助方法来确认两个值相等

84. 每一个函数、类与模块都要写 docstring

85. 用包来安排模块,以提供稳固的 API

  • 如果两个包里有同名的模块,或者两个模块里有同名的函数或类,那么后引入的那个会把先引入的覆盖掉。
  • 凡是没有出现在 all 之中的,都不会随着 from mypackage import * 语句引入,这相当于对外部使用者有效隐藏了名字
  • 如果要构建的包比较简单,那就把其中每个模块所对应的源文件都直接放在本包目录下,并在目录里创建一个 init.py 文件

86. 考虑用模块级别的代码配置不同的部署环境

  • 如果环境配置起来特别复杂,那就不要使用 TESTING 这样单纯的 Python 常量,而是可以考虑构建专门的配置文件,并通过 python 的 configparser 模块解析

87. 为自编的模块定义根异常,让调用者能够专门处理与此 API 有关的异常

  • 给模块定义根异常,可以让使用这个模块的 API 用户将他们的代码与这个模块所提供的 API 隔开,以便分别处理其中的错误

88. 用适当的方法打破循环依赖关系

  • 如果两个模块都要在开头引入对方,那就会形成循环依赖关系,这有可能导致程序在启动时崩溃
  • 最好的办法是把这两个模块都要使用的代码重构到整个依赖体系最底层
  • 最简单的方式是动态引入,把 import 语句从模块级别下移到函数或方法里面

89. 重构时考虑通过 warning 提醒开发者 API 已经发生变化

1
2
import warning
warning.warn()
  • 可以通过 python -W error 选项,PYTHONWARNING 。把 API 所发出的警告视为错误
  • 如果程序部署到生产环境,那么可以通过 logging 模块将警告信息定向到日志系统,把程序在运行过程中遇到的警告纳入现有的错误报告机制之中。

90. 考虑通过 typing 做静态分析,以消除 bug

  • 如果刚开始写代码的时候,就想着如何添加类型注解,那可能会拖慢编程速度。所以我们通常应该先写代码
  • 类型提示信息最能发挥作用,是在项目与项目衔接处。
作者

Lookup

发布于

2023-10-15

更新于

2024-03-25

许可协议